Example 1 - writing UDAFs the simple way

This small example shows how simple it could be to write a UDAF in Spark with moderate additions to the existing API. It takes the example published in the Databricks blog to add an operator for the harmonic mean.

Let's get done with some imports first:


In [1]:
# The main function
import karps as ks
# The standard library
import karps.functions as f
# Some tools to display the computation process:
from karps.display import show_phase

Here is the definition of the harmonic mean, which is a simple function. Given a column containing floating point values, it is defined as such:


In [2]:
def harmonic_mean(col):
    count = f.as_double(f.count(col))
    inv_sum = 1.0/f.sum(1.0/col)
    return inv_sum * count

This is exactly how one would want to code it in numpy, pandas, and using basic Spark constructs. In fact, you can run this code straight inside Pandas:


In [3]:
# Using Pandas to evaluate our function:
import pandas as pd
pandas_df = pd.DataFrame([1.0, 2.0])
harmonic_mean(pandas_df)


Out[3]:
1.3333333333333333

This code has a number of problems if you want to use it in Spark however:

  • reusability: this function works great on the column of a dataframe or of a column, but it cannot be reused with groupby for instance.
  • performance: most Spark tutorials will teach you that as it stands, this function has crappy performance. It will recompute the input twice, which may be very expensive in some cases.

This is why if one wants to use it, it is immediately advised to use the cache function of Spark, which still requires all the data to stay materialized. Karps provides the convenient autocache operator which automatically decide if caching is appropriate. We are going to use it on this simple example:


In [4]:
# Create a HUGE dataframe
df = ks.dataframe([1.0, 2.0], name="my_input")
df


Out[4]:
/my_input@org.spark.DistributedLiteral:double

In [5]:
# And apply our function:
cached_df = f.autocache(df)
hmean = harmonic_mean(cached_df)
hmean


Out[5]:
/multiply6!org.spark.LocalStructuredTransform:double

Something to immediately note is that the computation is lazy: nothing gets computed and all you get is an object called multiply6 of type double. Let's compute it. Thanks to lazy evaluation, the Karps compiler can rearrange the computations to make them run faster:


In [6]:
# All computations happen within a session, which keeps track of the state in Spark.
s = ks.session("demo1")

The compute function not only triggest the computation, but also provides more debugging information into what is happening. We are going to introspect the compiler passes to see how things get transformed.


In [7]:
comp = s.compute(hmean)

Here is the initial graph of computation, as we built it. Click on the nodes to have more detailed information.

It is very clear that two computations are going to be run in parallel from the same dataset, and that caching will happen right before forking these computations.


In [8]:
show_phase(comp, "initial")



In [9]:
show_phase(comp, "MERGE_PREAGG_AGGREGATIONS")


The important part to notice though is that after the count1 and the sum4 nodes, all the other nodes are observables (local values). They do not involve distributed datasets anymore, so they are very cheap to compute. The Karps compiler is going to optimize the distributed part to reduce the amount of computations, everything after that is not important for now.

One of the first phases merges the inverse3 node into a single lineage, and then fuses the aggregations sum4 and count1 into a single joint aggregation. If you look at the graph below, the new nodes sum4 and count1 are in fact dummy projections that operate on local data. All the hard work is being done in a new node with a horrible name: autocache0_ks_aggstruct....


In [10]:
show_phase(comp, "MERGE_AGGREGATIONS")


Now that we only perform a single aggregation, do we still need to cache the data? We don't! The next compiler phase is going to inspect the autocache nodes, and see how many times they get to be aggregated, and remove them if possible. In this case, it correctly infers that we do not need this autocache0 operator. Here is the final graph that gets executed:


In [11]:
show_phase(comp, "final")


More work could be done to simplify the local nodes, but this is outside the scope of this first project.

As a conclusion, we wrote some minimalistic, poorly performing code in python. Karps turned it into high-performance operations that can then be optimized easily by the Spark SQL engine. In fact, this code in practice is faster than a UDAF because it can directly understood by Tungsten. In addition, this function can be reused inside aggregations with no change, as we will see.

As a summary, karps lets you write the code you want to write, and turns it into a program that is:

  • faster (sometimes as fast or faster than manually crafted code)
  • reusable and easy to compose
  • easy to introspect thanks to tensorboard
  • easy to test independently

And to get the actual values:


In [12]:
comp.values()


Out[12]:
(double, double_value: 1.3333333333333333
)

Or in short if you do not want to see what is happening:


In [13]:
s.eval(hmean)


Out[13]:
1.3333333333333333

In [14]:
show_phase(comp, "parsed")



In [15]:
show_phase(comp, "physical")



In [16]:
show_phase(comp, "rdd")



In [17]:
comp.dump_profile("karps_trace_1.json")

In [ ]:


In [ ]:


In [ ]:


In [ ]: